Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AsyncPipeline that can schedule components to run concurrently #8812

Merged
merged 99 commits into from
Feb 7, 2025

Conversation

mathislucka
Copy link
Member

@mathislucka mathislucka commented Feb 4, 2025

Related Issues

Proposed Changes:

Implements an AsyncPipeline that supports:

  • running pipelines asynchronously
  • step-by-step execution through an async generator
  • concurrent execution of components whenever possible (e.g. hybrid retrieval, multiple generators that can run in parallel)
  • sync run-method with concurrent execution of components

How did you test it?

  • unit tests
  • adapted behavioral tests to use Pipeline and AsyncPipeline

Notes for the reviewer

Review after #8707
Code was reviewed here before: deepset-ai/haystack-experimental#180

Checklist

  • I have read the contributors guidelines and the code of conduct
  • I have updated the related issue with new insights and changes
  • I added unit tests and updated the docstrings
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I documented my code
  • I ran pre-commit hooks and fixed any issue

@mathislucka mathislucka requested review from davidsbatista and Amnah199 and removed request for vblagoje February 6, 2025 15:34
@mathislucka
Copy link
Member Author

@Amnah199 @davidsbatista much smaller diff now that the other PR is merged.

This is largely the same as the PR that we already merged to experimental with the following differences:

  • fixed bug where we didn't wait long enough for DEFER(_LAST)
  • added pipeline type to telemetry

@@ -23,6 +23,7 @@
"default_to_dict",
"DeserializationError",
"ComponentError",
"AsyncPipeline",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) suggestion: keeping these imports ordered alphabetically helps locate something as the list grows

__all__ = [
    "Answer",
    "AsyncPipeline",
    "ComponentError",
    "DeserializationError",
    "Document",
    "ExtractedAnswer",
    "GeneratedAnswer",
    "Pipeline",
    "PredefinedPipeline",
    "component",
    "default_from_dict",
    "default_to_dict",
]

@davidsbatista
Copy link
Contributor

I did another quick review, although most of this was already reviewed before

From my side it's approved, but to play safe, let's wait for Amna to also do another quick review before merging.

Copy link
Contributor

@davidsbatista davidsbatista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines 12 to 14
async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(async_loop)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well, we can avoid manual handling of loops by using asyncio.run, if you feel that would be better.

Comment on lines 6 to 26
def test_async_pipeline_reentrance(waiting_component, spying_tracer):
pp = AsyncPipeline()
pp.add_component("wait", waiting_component())

run_data = [{"wait_for": 1}, {"wait_for": 2}]

async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(async_loop)

async def run_all():
# Create concurrent tasks for each pipeline run
tasks = [pp.run_async(data) for data in run_data]
await asyncio.gather(*tasks)

try:
async_loop.run_until_complete(run_all())
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"]
for span in component_spans:
assert span.tags["haystack.component.visits"] == 1
finally:
async_loop.close()
Copy link
Contributor

@Amnah199 Amnah199 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this? (although I didnt test it)

Suggested change
def test_async_pipeline_reentrance(waiting_component, spying_tracer):
pp = AsyncPipeline()
pp.add_component("wait", waiting_component())
run_data = [{"wait_for": 1}, {"wait_for": 2}]
async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(async_loop)
async def run_all():
# Create concurrent tasks for each pipeline run
tasks = [pp.run_async(data) for data in run_data]
await asyncio.gather(*tasks)
try:
async_loop.run_until_complete(run_all())
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"]
for span in component_spans:
assert span.tags["haystack.component.visits"] == 1
finally:
async_loop.close()
def test_async_pipeline_reentrance(waiting_component, spying_tracer):
"""
Test that the AsyncPipeline can execute multiple runs concurrently and that
each component is called exactly once per run (as indicated by the 'visits' tag).
"""
async_pipeline = AsyncPipeline()
async_pipeline.add_component("wait", waiting_component())
run_data = [{"wait_for": 1}, {"wait_for": 2}]
async def run_all():
tasks = [async_pipeline.run_async(data) for data in run_data]
await asyncio.gather(*tasks)
component_spans = [
sp for sp in spying_tracer.spans
if sp.operation_name == "haystack.component.run_async"
]
for span in component_spans:
expected_visits = 1
actual_visits = span.tags.get("haystack.component.visits")
assert actual_visits == expected_visits, (
f"Expected {expected_visits} visit, got {actual_visits} for span {span}"
)
# Use asyncio.run to manage the event loop.
asyncio.run(run_all())

Copy link
Contributor

@Amnah199 Amnah199 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks again @mathislucka.
Much appreciated!

@mathislucka mathislucka merged commit e5b9bde into main Feb 7, 2025
18 checks passed
@mathislucka mathislucka deleted the feat/async_pipeline branch February 7, 2025 15:37
@alex-stoica
Copy link

Excellent work @mathislucka 🚀! I've waited a lot for this feature! I hope also the documentation will be updated soon.
I've personally tested the implementation with a code a little bit adapted from deepset-ai/haystack-experimental#144 and it works as expected, I see both pipeline-wise concurrency and component-wise concurrency

import asyncio
from haystack import AsyncPipeline
from haystack import component
from datetime import datetime

def print_with_prefix(pipeline_prefix: str, component_name: str, message: str):
    """Prints a message prefixed with the pipeline and component name."""
    print(f"[{pipeline_prefix}] {component_name} {message} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

async def async_sleep_task(duration: int):
    await asyncio.sleep(duration)

@component
class ComponentA:
    @component.output_types(A_output=str)
    def run(self, dummy: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentA", "run started")
        result = {"A_output": f"Processed by A: {dummy}"}
        print_with_prefix(prefix, "ComponentA", "run ended")
        return result

    @component.output_types(A_output=str)
    async def run_async(self, dummy: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentA", "run_async started")
        await async_sleep_task(3)
        result = {"A_output": f"Processed by A: {dummy}"}
        print_with_prefix(prefix, "ComponentA", "run_async ended")
        return result

@component
class ComponentB:
    @component.output_types(B_output=str)
    def run(self, dummy: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentB", "run started")
        result = {"B_output": f"Processed by B: {dummy}"}
        print_with_prefix(prefix, "ComponentB", "run ended")
        return result

    @component.output_types(B_output=str)
    async def run_async(self, dummy: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentB", "run_async started")
        await async_sleep_task(2)
        result = {"B_output": f"Processed by B: {dummy}"}
        print_with_prefix(prefix, "ComponentB", "run_async ended")
        return result

@component
class ComponentC:
    @component.output_types(C_output=str)
    def run(self, A_output: str, B_output: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentC", "run started")
        result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
        print_with_prefix(prefix, "ComponentC", "run ended")
        return result

    @component.output_types(C_output=str)
    async def run_async(self, A_output: str, B_output: str) -> dict:
        prefix = getattr(self, 'pipeline_name', 'Unknown')
        print_with_prefix(prefix, "ComponentC", "run_async started")
        await async_sleep_task(1)
        result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
        print_with_prefix(prefix, "ComponentC", "run_async ended")
        return result

def create_pipeline(name: str):
    pipeline = AsyncPipeline()
    pipeline.name = name
    
    comp_a = ComponentA()
    comp_b = ComponentB()
    comp_c = ComponentC()

    comp_a.pipeline_name = name
    comp_b.pipeline_name = name
    comp_c.pipeline_name = name

    pipeline.add_component("A", comp_a)
    pipeline.add_component("B", comp_b)
    pipeline.add_component("C", comp_c)
    pipeline.connect("A.A_output", "C.A_output")
    pipeline.connect("B.B_output", "C.B_output")
    return pipeline

if __name__ == "__main__":
    async def run_pipeline(pipeline, input_data):
        output = await pipeline.run_async(input_data)
        print(f"[{pipeline.name}] Pipeline output: {output}")

    async def main():
        input_data1 = {"dummy": "Test data 1"}
        input_data2 = {"dummy": "Test data 2"}
        pipeline1 = create_pipeline("P1")
        pipeline2 = create_pipeline("P2")
        # Run both pipelines concurrently.
        task1 = asyncio.create_task(run_pipeline(pipeline1, input_data1))
        task2 = asyncio.create_task(run_pipeline(pipeline2, input_data2))
        await asyncio.gather(task1, task2)

    asyncio.run(main())

blue-gitty pushed a commit to blue-gitty/haystack that referenced this pull request Feb 9, 2025
…eepset-ai#8812)

* add component checks

* pipeline should run deterministically

* add FIFOQueue

* add agent tests

* add order dependent tests

* run new tests

* remove code that is not needed

* test: intermediate from cycle outputs are available outside cycle

* add tests for component checks (Claude)

* adapt tests for component checks (o1 review)

* chore: format

* remove tests that aren't needed anymore

* add _calculate_priority tests

* revert accidental change in pyproject.toml

* test format conversion

* adapt to naming convention

* chore: proper docstrings and type hints for PQ

* format

* add more unit tests

* rm unneeded comments

* test input consumption

* lint

* fix: docstrings

* lint

* format

* format

* fix license header

* fix license header

* add component run tests

* fix: pass correct input format to tracing

* fix types

* format

* format

* types

* add defaults from Socket instead of signature

- otherwise components with dynamic inputs would fail

* fix test names

* still wait for optional inputs on greedy variadic sockets

- mirrors previous behavior

* fix format

* wip: warn for ambiguous running order

* wip: alternative warning

* fix license header

* make code more readable

Co-authored-by: Amna Mubashar <[email protected]>

* Introduce content tracing to a behavioral test

* Fixing linting

* Remove debug print statements

* Fix tracer tests

* remove print

* test: test for component inputs

* test: remove testing for run order

* chore: update component checks from experimental

* chore: update pipeline and base from experimental

* refactor: remove unused method

* refactor: remove unused method

* refactor: outdated comment

* refactor: inputs state is updated as side effect

- to prepare for AsyncPipeline implementation

* format

* test: add file conversion test

* format

* fix: original implementation deepcopies outputs

* lint

* fix: from_dict was updated

* fix: format

* fix: test

* test: add test for thread safety

* remove unused imports

* format

* test: FIFOPriorityQueue

* chore: add release note

* feat: add AsyncPipeline

* chore: Add release notes

* fix: format

* debug: switch run order to debug ubuntu and windows tests

* fix: consider priorities of other components while waiting for DEFER

* refactor: simplify code

* fix: resolve merge conflict with mermaid changes

* fix: format

* fix: remove unused import

* refactor: rename to avoid accidental conflicts

* fix: track pipeline type

* fix: and extend test

* fix: format

* style: sort alphabetically

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml

* fix: indentation, do not close loop

* fix: use asyncio.run

* fix: format

---------

Co-authored-by: Amna Mubashar <[email protected]>
Co-authored-by: David S. Batista <[email protected]>
blue-gitty pushed a commit to blue-gitty/haystack that referenced this pull request Feb 9, 2025
…eepset-ai#8812)

* add component checks

* pipeline should run deterministically

* add FIFOQueue

* add agent tests

* add order dependent tests

* run new tests

* remove code that is not needed

* test: intermediate from cycle outputs are available outside cycle

* add tests for component checks (Claude)

* adapt tests for component checks (o1 review)

* chore: format

* remove tests that aren't needed anymore

* add _calculate_priority tests

* revert accidental change in pyproject.toml

* test format conversion

* adapt to naming convention

* chore: proper docstrings and type hints for PQ

* format

* add more unit tests

* rm unneeded comments

* test input consumption

* lint

* fix: docstrings

* lint

* format

* format

* fix license header

* fix license header

* add component run tests

* fix: pass correct input format to tracing

* fix types

* format

* format

* types

* add defaults from Socket instead of signature

- otherwise components with dynamic inputs would fail

* fix test names

* still wait for optional inputs on greedy variadic sockets

- mirrors previous behavior

* fix format

* wip: warn for ambiguous running order

* wip: alternative warning

* fix license header

* make code more readable

Co-authored-by: Amna Mubashar <[email protected]>

* Introduce content tracing to a behavioral test

* Fixing linting

* Remove debug print statements

* Fix tracer tests

* remove print

* test: test for component inputs

* test: remove testing for run order

* chore: update component checks from experimental

* chore: update pipeline and base from experimental

* refactor: remove unused method

* refactor: remove unused method

* refactor: outdated comment

* refactor: inputs state is updated as side effect

- to prepare for AsyncPipeline implementation

* format

* test: add file conversion test

* format

* fix: original implementation deepcopies outputs

* lint

* fix: from_dict was updated

* fix: format

* fix: test

* test: add test for thread safety

* remove unused imports

* format

* test: FIFOPriorityQueue

* chore: add release note

* feat: add AsyncPipeline

* chore: Add release notes

* fix: format

* debug: switch run order to debug ubuntu and windows tests

* fix: consider priorities of other components while waiting for DEFER

* refactor: simplify code

* fix: resolve merge conflict with mermaid changes

* fix: format

* fix: remove unused import

* refactor: rename to avoid accidental conflicts

* fix: track pipeline type

* fix: and extend test

* fix: format

* style: sort alphabetically

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml

* fix: indentation, do not close loop

* fix: use asyncio.run

* fix: format

---------

Co-authored-by: Amna Mubashar <[email protected]>
Co-authored-by: David S. Batista <[email protected]>
@mathislucka
Copy link
Member Author

Thanks @alex-stoica ! Yes, we will have updated documentation for the 2.10 release. I'm glad that you find it useful and that it works as expected.

Since this is a quite complex feature, please let me know if you find anything that doesn't work as you would expect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

components should run concurrently when not explicitly waiting on inputs
5 participants